﻿#include "http_base_impl.hh"
#include "../../src/repo/src/include/root/coroutine/coroutine.h"
#include "http_parser/http_parser.h"
#include "klogger/interface/logger.h"
#include "../box/box_channel.hh"
#include "../box/service_box.hh"
#include "../config/box_config.hh"
#include "../detail/box_config_impl.hh"
#include "../detail/http_data.hh"
#include "../detail/lang_impl.hh"
#include "../util/object_pool.hh"
#include "../util/string_util.hh"
#include "../util/time_util.hh"
#include <vector>

kratos::http::HttpBaseImpl::HttpBaseImpl(service::ServiceBox *box) {
  box_ = box;
}

kratos::http::HttpBaseImpl::~HttpBaseImpl() {
  // 兼容测试用例：构造的时候box为空。正式运行时Http作为组件挂载在box之下。
  if (!box_) {
    return;
  }
  for (const auto &info : coro_listener_map_) {
    if ((info.second.coro_id != 0) && (info.second.coro_id != coro_id())) {
      // 非当前协程全部关闭
      coro_close(info.second.coro_id);
    }
  }
  for (const auto &info : coro_connector_map_) {
    if ((info.second.coro_id != 0) && (info.second.coro_id != coro_id())) {
      // 非当前协程全部关闭
      coro_close(info.second.coro_id);
    }
  }
  box_ = nullptr;
}

auto kratos::http::HttpBaseImpl::do_request_async(
    const std::string &host, int port, const std::string &uri,
    const std::string &method, const HeaderMap &headers,
    const std::string &content, int timeout, std::uint64_t user_data,
    ResponseHandler handler) -> bool {
  // 生成一个新的连接器名字
  std::string name = gen_connector_name();
  if (!connect_to(name, host, port, timeout)) {
    if (box_) {
      box_->write_log(lang::LangID::LANG_HTTP_CREATE_CONNECTOR_FAIL,
                      klogger::Logger::WARNING, host.c_str(), port);
    }
    // 连接失败
    return false;
  }
  auto request = make_shared_pool_ptr<HttpRequest>();
  call_map_[name] = request;
  request->set_handler(handler);
  request->set_method(method);
  request->set_uri(uri);
  request->set_host(host);
  request->set_port(port);
  request->set_uri(uri);
  request->set_base(this);
  request->get_settings()->on_body = &HttpBaseImpl::on_body;
  request->get_settings()->on_chunk_complete = &HttpBaseImpl::on_chunk_complete;
  request->get_settings()->on_chunk_header = &HttpBaseImpl::on_chunk_header;
  request->get_settings()->on_headers_complete =
      &HttpBaseImpl::on_headers_complete;
  request->get_settings()->on_header_field = &HttpBaseImpl::on_header_field;
  request->get_settings()->on_header_value = &HttpBaseImpl::on_header_value;
  request->get_settings()->on_message_begin = &HttpBaseImpl::on_message_begin;
  request->get_settings()->on_message_complete =
      &HttpBaseImpl::on_message_complete;
  request->get_settings()->on_status = &HttpBaseImpl::on_status;
  request->get_settings()->on_url = &HttpBaseImpl::on_url;
  request->set_headers(headers);
  request->set_content(content);
  request->set_user_data(user_data);
  return true;
}

// TODO timeout到时如果连接不上返回nullptr
auto kratos::http::HttpBaseImpl::do_request_co(const std::string &host,
                                               int port, const std::string &uri,
                                               const std::string &method,
                                               const HeaderMap &headers,
                                               const std::string &content,
                                               int timeout) -> HttpCallPtr {
  if (coro_is_main()) {
    // 不能在主协程调用
    return HttpCallPtr();
  }
  if (!timeout) {
    timeout = (int)get_http_max_call_timeout();
  }
  std::string name = gen_connector_name();
  if (!connect_to(name, host, port, timeout)) {
    if (box_) {
      box_->write_log(lang::LangID::LANG_HTTP_CREATE_CONNECTOR_FAIL,
                      klogger::Logger::WARNING, host.c_str(), port);
    }
    return HttpCallPtr();
  }
  auto request = make_shared_pool_ptr<HttpRequest>();
  call_map_[name] = request;
  request->set_handler(nullptr);
  request->set_method(method);
  request->set_uri(uri);
  request->set_host(host);
  request->set_port(port);
  request->set_uri(uri);
  request->set_base(this);
  request->get_settings()->on_body = &HttpBaseImpl::on_body;
  request->get_settings()->on_chunk_complete = &HttpBaseImpl::on_chunk_complete;
  request->get_settings()->on_chunk_header = &HttpBaseImpl::on_chunk_header;
  request->get_settings()->on_headers_complete =
      &HttpBaseImpl::on_headers_complete;
  request->get_settings()->on_header_field = &HttpBaseImpl::on_header_field;
  request->get_settings()->on_header_value = &HttpBaseImpl::on_header_value;
  request->get_settings()->on_message_begin = &HttpBaseImpl::on_message_begin;
  request->get_settings()->on_message_complete =
      &HttpBaseImpl::on_message_complete;
  request->get_settings()->on_status = &HttpBaseImpl::on_status;
  request->get_settings()->on_url = &HttpBaseImpl::on_url;
  request->set_headers(headers);
  request->set_content(content);
  auto now = util::get_os_time_millionsecond();
  add_connector_co(
      name, {coro_id(), util::get_os_time_millionsecond() +
                            static_cast<std::time_t>(std::time_t(timeout) *
                                                     std::time_t(1000))});
  // 挂起协程等待
  request->run_coro();
  remove_connector_co(name);
  auto it = call_map_.find(name);
  if (it != call_map_.end()) {
    if (it->second->is_msg_complete()) {
      return it->second;
    }
  }
  return HttpCallPtr();
}

auto kratos::http::HttpBaseImpl::wait_request_async(const std::string &host,
                                                    int port,
                                                    std::uint64_t user_data,
                                                    RequestHandler handler)
    -> bool {
  std::string name;
  if (!is_listener_exists(host, port)) {
    name = gen_client_name();
    if (!listen_at(name, host, port)) {
      if (box_) {
        box_->write_log(lang::LangID::LANG_HTTP_CREATE_CONNECTOR_FAIL,
                        klogger::Logger::WARNING, host.c_str(), port);
      }
      return false;
    } else {
      listener_list_.push_back({host, name, port});
    }
  } else {
    // 监听器已经存在
    name = get_listener_name(host, port);
  }
  listen_handler_map_[name] = {handler, user_data};
  return true;
}

auto kratos::http::HttpBaseImpl::wait_request_co(const std::string &host,
                                                 int port) -> HttpCallPtr {
  if (coro_is_main()) {
    // 不能在主协程调用
    return HttpCallPtr();
  }
  std::string name;
  if (!is_listener_exists(host, port)) {
    name = gen_client_name();
    if (!listen_at(name, host, port)) {
      if (box_) {
        box_->write_log(lang::LangID::LANG_HTTP_CREATE_CONNECTOR_FAIL,
                        klogger::Logger::WARNING, host.c_str(), port);
      }
      return HttpCallPtr();
    } else {
      listener_list_.push_back({host, name, port});
    }
  } else {
    return HttpCallPtr();
  }
  coro_listener_map_[name] = {HttpCallPtr(), coro_id()};
  // 出让CPU
  coro_yield();
  // 唤醒后查看是否有返回
  auto it = coro_listener_map_.find(name);
  if (it == coro_listener_map_.end()) {
    return HttpCallPtr();
  } else {
    return it->second.call;
  }
}

auto kratos::http::HttpBaseImpl::clean_timeout_call(std::time_t ms) -> void {
  for (auto it = non_finish_map_.begin(); it != non_finish_map_.end();) {
    if (it->second.call.expired()) {
      it = non_finish_map_.erase(it);
    } else {
      if (it->second.dead_line < ms) {
        // TODO timeout
        // 强制关闭管道
        it->second.call.lock()->close();
      }
      it++;
    }
  }
}

auto kratos::http::HttpBaseImpl::get_coro_listener_map() -> CoroListenerMap & {
  return coro_listener_map_;
}

auto kratos::http::HttpBaseImpl::is_listener_exists(const std::string &host,
                                                    int port) -> bool {
  for (const auto &info : listener_list_) {
    if (host == info.host && port == info.port) {
      return true;
    }
  }
  return false;
}

auto kratos::http::HttpBaseImpl::get_listener_name(const std::string &host,
                                                   int port)
    -> const std::string & {
  static std::string null_name;
  for (const auto &info : listener_list_) {
    if (host == info.host && port == info.port) {
      return info.listener_name;
    }
  }
  return null_name;
}

auto kratos::http::HttpBaseImpl::write_log_line(int level,
                                                const std::string &log_line)
    -> void {
  if (box_) {
    box_->write_log_line(level, log_line);
  }
}

auto kratos::http::HttpBaseImpl::add_connector_co(
    const std::string &name, const HttpBaseImpl::ConnectorInfo &info) -> void {
  coro_connector_map_.insert(std::make_pair(name, info));
}

auto kratos::http::HttpBaseImpl::remove_connector_co(const std::string &name)
    -> void {
  coro_connector_map_.erase(name);
}

auto kratos::http::HttpBaseImpl::check_non_finish_connector_co(std::time_t ms)
    -> void {
  for (auto it = coro_connector_map_.begin();
       it != coro_connector_map_.end();) {
    if (it->second.dead_line < ms) {
      auto coro_id = it->second.coro_id;
      it = coro_connector_map_.erase(it);
      coro_resume(coro_id);
      // NOTICE 一帧销毁一个，防止另外一个协程也改变这个容器
      return;
    } else {
      it++;
    }
  }
}

auto kratos::http::HttpBaseImpl::get_http_max_call_timeout() -> std::time_t {
  if (box_) {
    return static_cast<std::time_t>(
        box_->get_config().get_http_max_call_timeout());
  } else {
    return DEFAULT_MAX_TIMEOUT;
  }
}

auto kratos::http::HttpBaseImpl::update(std::time_t ms) -> void {
  if (!ms) {
    ms = util::get_os_time_millionsecond();
  }
  BoxNetwork::update();
  // 检查未完成的调用是否超时
  clean_timeout_call(ms);
  // 检查未完成的协程内连接器是否超时
  check_non_finish_connector_co(ms);
  return;
}

auto kratos::http::HttpBaseImpl::get_config() -> kratos::config::BoxConfig & {
  if (!box_) {
    // 为测试用例准备
    static config::BoxConfigImpl default_config(nullptr);
    return default_config;
  }
  return box_->get_config();
}

auto kratos::http::HttpBaseImpl::get_logger_appender() -> klogger::Appender * {

  if (!box_) {
    return nullptr;
  }

  return box_->get_logger_appender();
}

auto kratos::http::HttpBaseImpl::get_lang() -> lang::Lang * {

  if (!box_) {
    return nullptr;
  }

  return box_->get_lang();
}

void kratos::http::HttpBaseImpl::on_listen(
    const std::string &name, bool success,
    std::shared_ptr<service::BoxChannel> &channel) {
  if (!success) {
    // 监听器启动失败
    listen_handler_map_.erase(name);
    auto coro_listener_it = coro_listener_map_.find(name);
    if (coro_listener_it != coro_listener_map_.end()) {
      // 唤醒监听器协程，返回失败
      coro_resume(coro_listener_it->second.coro_id);
      coro_listener_map_.erase(coro_listener_it);
    }
    for (auto it = listener_list_.begin(); it != listener_list_.end(); it++) {
      if (it->listener_name == name) {
        if (box_) {
          box_->write_log(lang::LangID::LANG_HTTP_CREATE_CONNECTOR_FAIL,
                          klogger::Logger::WARNING, it->host.c_str(), it->port);
        }
        listener_list_.erase(it);
        return;
      }
    }
  }
}

void kratos::http::HttpBaseImpl::on_accept(
    std::shared_ptr<service::BoxChannel> &channel) {
  auto response = make_shared_pool_ptr<HttpResponse>();
  std::string client_name = gen_client_name();
  call_map_[client_name] = response;
  // 设置协程监听器名称
  channel->set_coro_listener_name(channel->get_channel_name());
  // 查找监听器
  auto it = listen_handler_map_.find(channel->get_channel_name());
  if (it != listen_handler_map_.end()) {
    // 设置监听器回调
    response->set_handler(it->second.handler);
    response->set_user_data(it->second.user_data);
  }
  // 设置客户端名称
  channel->set_channel_name(client_name);
  response->set_base(this);
  response->set_channel_id(channel->get_id());
  response->get_settings()->on_body = &HttpBaseImpl::on_body;
  response->get_settings()->on_chunk_complete =
      &HttpBaseImpl::on_chunk_complete;
  response->get_settings()->on_chunk_header = &HttpBaseImpl::on_chunk_header;
  response->get_settings()->on_headers_complete =
      &HttpBaseImpl::on_headers_complete;
  response->get_settings()->on_header_field = &HttpBaseImpl::on_header_field;
  response->get_settings()->on_header_value = &HttpBaseImpl::on_header_value;
  response->get_settings()->on_message_begin = &HttpBaseImpl::on_message_begin;
  response->get_settings()->on_message_complete =
      &HttpBaseImpl::on_message_complete;
  response->get_settings()->on_status = &HttpBaseImpl::on_status;
  response->get_settings()->on_url = &HttpBaseImpl::on_url;
}

void kratos::http::HttpBaseImpl::on_connect(
    const std::string &name, bool success,
    std::shared_ptr<service::BoxChannel> &channel) {
  auto it = call_map_.find(name);
  if (!success) {
    if (channel) {
      channel->close();
    } else {
      if (it != call_map_.end()) {
        // 连接失败
        auto http_request = std::dynamic_pointer_cast<HttpRequest>(it->second);
        if (http_request) {
          if (http_request->get_handler()) {
            // 通知连接失败
            http_request->get_handler()(std::weak_ptr<HttpCall>(), http_request->get_user_data());
          }
        }
        call_map_.erase(it);
      }
    }
  } else {
    if (it != call_map_.end()) {
      it->second->set_channel_id(channel->get_id());
      it->second->send_request();
    }
  }
}

void kratos::http::HttpBaseImpl::on_close(
    std::shared_ptr<service::BoxChannel> &channel) {
  // TODO timeout
  call_map_.erase(channel->get_channel_name());
  non_finish_map_.erase(channel->get_channel_name());
}

void kratos::http::HttpBaseImpl::on_data(
    std::shared_ptr<service::BoxChannel> &channel) {
  auto it = call_map_.find(channel->get_channel_name());
  if (it == call_map_.end()) {
    return;
  }
  auto *call = it->second.get();
  auto bytes_need_read = channel->size();
  auto *buffer = call->get_resized_buffer(bytes_need_read);
  auto cur_pos = call->get_buffer_len();
  auto recv_bytes =
      channel->recv(buffer + call->get_buffer_len(), channel->size());
  if (recv_bytes != bytes_need_read) {
    channel->close();
    return;
  }
  auto parsed_bytes =
      http_parser_execute(call->get_parser(), call->get_settings(),
                          call->get_buffer() + cur_pos, recv_bytes);
  if (parsed_bytes != static_cast<std::size_t>(recv_bytes)) {
    channel->close();
    return;
  }
  if (call->get_parser()->upgrade) {
    // 不支持升级协议
    channel->close();
    return;
  }
  call->adjust_buffer_length(recv_bytes);
}

int kratos::http::HttpBaseImpl::on_message_begin(http_parser *) { return 0; }

int kratos::http::HttpBaseImpl::on_message_complete(http_parser *parser) {
  if (parser->type == HTTP_REQUEST) {
    auto *response = reinterpret_cast<HttpResponse *>(parser->data);
    auto channel_id = response->get_channel_id();
    auto channel = response->get_base()->get_channel(channel_id);
    auto &call_map = response->get_base()->get_call_map();
    response->finish_parse();
    response->set_msg_complete();
    if (response->get_handler()) {
      auto handler = response->get_handler();
      auto &non_finish_map = response->get_base()->get_non_finish_map();
      auto it = call_map.find(channel->get_channel_name());
      if (it != call_map.end()) {
        if (handler(std::weak_ptr(it->second), response->get_response(), response->get_user_data())) {
          response->finish();
        }
      }
      // 添加到等待表，防止内存泄漏，防止长时间未关闭
      non_finish_map[it->first] = {
          std::weak_ptr(it->second),
          util::get_os_time_millionsecond() +
              response->get_base()->get_http_max_call_timeout() * 1000};
    } else {
      // 没有设置事件回调
      auto &coro_listener_map = response->get_base()->get_coro_listener_map();
      auto it = coro_listener_map.find(channel->get_coro_listener_name());
      if (it != coro_listener_map.end()) {
        const auto &call = call_map[channel->get_channel_name()];
        it->second.call = HttpCallPtr(call);
        coro_resume(it->second.coro_id);
      }
      response->close();
    }
  } else if (parser->type == HTTP_RESPONSE) {
    auto *request = reinterpret_cast<HttpRequest *>(parser->data);
    request->finish_parse();
    request->set_msg_complete();
    if (request->get_handler()) {
      auto handler = request->get_handler();
      auto channel_id = request->get_channel_id();
      auto channel = request->get_base()->get_channel(channel_id);
      const auto &call_map = request->get_base()->get_call_map();
      auto &non_finish_map = request->get_base()->get_non_finish_map();
      auto it = call_map.find(channel->get_channel_name());
      if (it != call_map.end()) {
        handler(std::weak_ptr(it->second), it->second->get_user_data());
        request->finish();
      }
      // 添加到等待表，防止内存泄漏，防止长时间未关闭
      non_finish_map[it->first] = {
          std::weak_ptr(it->second),
          util::get_os_time_millionsecond() +
              request->get_base()->get_http_max_call_timeout() * 1000};
    } else {
      // 没有设置事件回调
      if (request->is_coro()) {
        request->wakeup();
      }
      request->close();
    }
  }
  return 0;
}

int kratos::http::HttpBaseImpl::on_url(http_parser *parser, const char *at,
                                       std::size_t length) {
  if (parser->type == HTTP_REQUEST) {
    auto *response = reinterpret_cast<HttpResponse *>(parser->data);
    response->set_uri(std::string(at, length));
  } else if (parser->type == HTTP_RESPONSE) {
    auto *request = reinterpret_cast<HttpRequest *>(parser->data);
    request->set_uri(std::string(at, length));
  }
  return 0;
}

int kratos::http::HttpBaseImpl::on_header_field(http_parser *parser,
                                                const char *at,
                                                std::size_t length) {
  if (parser->type == HTTP_REQUEST) {
    auto *response = reinterpret_cast<HttpResponse *>(parser->data);
    response->add_header_key(std::string(at, length));
  } else if (parser->type == HTTP_RESPONSE) {
    auto *request = reinterpret_cast<HttpRequest *>(parser->data);
    request->add_header_key(std::string(at, length));
  }
  return 0;
}

int kratos::http::HttpBaseImpl::on_header_value(http_parser *parser,
                                                const char *at,
                                                std::size_t length) {
  if (parser->type == HTTP_REQUEST) {
    auto *response = reinterpret_cast<HttpResponse *>(parser->data);
    response->add_header_value(std::string(at, length));
  } else if (parser->type == HTTP_RESPONSE) {
    auto *request = reinterpret_cast<HttpRequest *>(parser->data);
    request->add_header_value(std::string(at, length));
  }
  return 0;
}

int kratos::http::HttpBaseImpl::on_status(http_parser *, const char *,
                                          std::size_t) {
  return 0;
}

int kratos::http::HttpBaseImpl::on_headers_complete(http_parser *) { return 0; }

int kratos::http::HttpBaseImpl::on_body(http_parser *parser, const char *at,
                                        std::size_t length) {
  if (parser->type == HTTP_REQUEST) {
    auto *response = reinterpret_cast<HttpResponse *>(parser->data);
    response->set_body_info(at, length);
  } else if (parser->type == HTTP_RESPONSE) {
    auto *request = reinterpret_cast<HttpRequest *>(parser->data);
    request->set_body_info(at, length);
  }
  return 0;
}

int kratos::http::HttpBaseImpl::on_chunk_header(http_parser *) { return 0; }

int kratos::http::HttpBaseImpl::on_chunk_complete(http_parser *) { return 0; }

auto kratos::http::HttpBaseImpl::gen_client_name() -> std::string {
  return "http_server" + std::to_string(name_index_++);
}

auto kratos::http::HttpBaseImpl::gen_connector_name() -> std::string {
  return "http_client" + std::to_string(name_index_++);
}

auto kratos::http::HttpBaseImpl::get_call_map() -> HttpCallMap & {
  return call_map_;
}

auto kratos::http::HttpBaseImpl::get_listener_map() -> ListenHandlerMap & {
  return listen_handler_map_;
}

auto kratos::http::HttpBaseImpl::get_non_finish_map() -> NonFinishMap & {
  return non_finish_map_;
}
